package com.gearbox.core.task;

import com.gearbox.core.cache.NodeIdleTimeCache;
import com.gearbox.core.cache.UserWaitingJobCache;
import com.gearbox.core.configuration.SystemConfiguration;
import com.gearbox.core.configuration.TaskConfiguration;
import com.gearbox.core.model.Job;
import com.gearbox.core.model.Node;
import com.gearbox.core.service.NodeService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Component
public class CacheRefreshTask {
    private static final Logger LOG = LoggerFactory.getLogger(CacheRefreshTask.class);

    @Autowired
    private NodeService nodeService;

    @Autowired
    private NodeIdleTimeCache nodeIdleTimeCache;

    @Autowired
    private UserWaitingJobCache userWaitingJobCache;

    @Autowired
    private SystemConfiguration systemConfiguration;

    @Autowired
    private TaskConfiguration taskConfiguration;

    @Scheduled(initialDelay = 5L, fixedDelayString = "${task.refresh-cache-period}", timeUnit = TimeUnit.SECONDS)
    void refreshIdleTimeCache() {
        try {
            LOG.info("refreshIdleTimeCache begin");
            List<Node> idleNodeList = nodeService.listUnstableIdleNodes();
            LOG.info("list idle nodes end, nodes={}", idleNodeList);
            increaseNodeIdleTimeInCache(idleNodeList);
            clearNodeIdleTimeCache(idleNodeList);
            LOG.info("clear idleTime cache end, left nodes={}", nodeIdleTimeCache.getAllNodeNames());
            LOG.info("refreshIdleTimeCache end");
        } catch (Throwable e) {
            LOG.error("refreshIdleTimeCache error", e);
        }
    }

    void increaseNodeIdleTimeInCache(List<Node> idleNodeList) {
        idleNodeList.forEach(
            node -> nodeIdleTimeCache.increaseIdleTime(node.getName(), taskConfiguration.getRefreshCachePeriod(),
                systemConfiguration.getScaleInTime()));
    }

    void clearNodeIdleTimeCache(List<Node> idleNodeList) {
        Set<String> idleNodeNameSet = idleNodeList.stream().map(Node::getName).collect(Collectors.toSet());
        nodeIdleTimeCache.getAllNodeNames()
            .stream()
            .filter(nodeName -> !idleNodeNameSet.contains(nodeName))
            .forEach(nodeIdleTimeCache::remove);
    }

    @Scheduled(initialDelay = 5L, fixedDelayString = "${task.refresh-cache-period}", timeUnit = TimeUnit.SECONDS)
    void refreshWaitingJobCache() {
        Set<String> users = userWaitingJobCache.getUsers();
        users.forEach(user -> {
            List<Job> userJobs = userWaitingJobCache.getJobsByUser(user);
            if (userJobs == null || userJobs.isEmpty()) {
                userWaitingJobCache.removeUser(user);
                return;
            }
            userJobs.forEach(userWaitingJobCache::removeJobIfNotUpdateLonging);
        });
        LOG.info("refresh waiting jobs cache end, left users is {}", userWaitingJobCache.getUsers());
    }
}
